adbe8850f5e14dae21bb65e53586048b6bc3f2dd,cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/co/cask/cdap/etl/planner/PipelinePlanner.java,PipelinePlanner,dagToPipeline,#Dag#Set#Map#,246

Before Change


      // add other plugin types
      StageSpec spec = specs.get(stageName);
      String pluginType = spec.getPlugin().getType();
      StageInfo stageInfo = new StageInfo(stageName, spec.getInputs(), spec.getInputSchemas(), spec.getOutputs(),
                                          spec.getOutputSchema(), spec.getErrorDatasetName());
      phaseBuilder.addStage(pluginType, stageInfo);
    }

After Change


  private PipelinePhase dagToPipeline(Dag dag, Set<String> connectors, Map<String, StageSpec> specs) {
    PipelinePhase.Builder phaseBuilder = PipelinePhase.builder(supportedPluginTypes);

    for (String stageName : dag.getTopologicalOrder()) {
      Set<String> outputs = dag.getNodeOutputs(stageName);
      if (!outputs.isEmpty()) {
        phaseBuilder.addConnections(stageName, outputs);
      }

      // add connectors
      if (connectors.contains(stageName)) {
        phaseBuilder.addStage(StageInfo.builder(stageName, Constants.CONNECTOR_TYPE).build());
        continue;
      }

      // add other plugin types
      StageSpec spec = specs.get(stageName);
      String pluginType = spec.getPlugin().getType();
      phaseBuilder.addStage(StageInfo.builder(stageName, pluginType)
                              .addInputs(spec.getInputs())
                              .addInputSchemas(spec.getInputSchemas())
                              .addOutputs(spec.getOutputs())
                              .setOutputSchema(spec.getOutputSchema())